Kafka Consumer Session Timeout

在kafka client(consumer)的log中看到这样的异常:

1
2
[13/06/16 16:01:26:153 PM CST] 550 ERROR ConsumerCoordinator: Error ILLEGAL_GENERATION occurred while committing offsets for group group7
`[13/06/16 16:01:26:154 PM CST] 424 WARN ConsumerCoordinator: Auto offset commit failed: Commit cannot be completed due to group rebalance

检查server端的log

1
2
3
4
5
[2016-06-13 04:01:16,458] INFO [GroupCoordinator 1]: Preparing to restabilize group group7 with old generation 0 (kafka.coordinator.GroupCoordinator)
[2016-06-13 04:01:16,458] INFO [GroupCoordinator 1]: Stabilized group group7 generation 1 (kafka.coordinator.GroupCoordinator)
[2016-06-13 04:01:16,462] INFO [GroupCoordinator 1]: Assignment received from leader for group group7 for generation 1 (kafka.coordinator.GroupCoordinator)
[2016-06-13 04:01:46,469] INFO [GroupCoordinator 1]: Preparing to restabilize group group7 with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-06-13 04:01:46,470] INFO [GroupCoordinator 1]: Group group7 generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)

Kafka consumer group有generation(代)的概念,一个group开始消费之后,被标记为G代,由于一些原因,G代过期,该group被标记为G+1代,这时如果G代的group尝试commit offset,就会发生本文开头出现的错误。

那么什么会导致generation过期呢?

  • Rebalance

显然rebalance前后,consumer和partition之前的消费关系已经发生了变化,如果generation不过期,会发生offset的误提交

  • session timeout

每次consumer.poll的调用都会向server端发送心跳,如果server端在session.timeout.ms(默认30s)时间内没有收到心跳,则认为该consumer group已经断开或者shutdown

经排查,我们这次问题的原因是每次poll会拿到一万多条数据,这些数据的处理需要耗费大概60s,所以当consumer执行下一次poll并commit offset时,server端发现该group的generation已经过期,因此报错。

解决方案是修改consumer的参数session.timeout.ms为120s,或者限制每次poll最多只拿2000条数据(max.poll.records, kafka1.0)